Skip to content

Conversation

@MasterPtato
Copy link
Contributor

No description provided.

@vercel
Copy link

vercel bot commented Nov 18, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
rivetkit-serverless Ready Ready Preview Comment Nov 20, 2025 3:08am
3 Skipped Deployments
Project Deployment Preview Comments Updated (UTC)
rivet-cloud Ignored Ignored Nov 20, 2025 3:08am
rivet-inspector Ignored Ignored Preview Nov 20, 2025 3:08am
rivet-site Ignored Ignored Preview Nov 20, 2025 3:08am

Copy link
Contributor Author

MasterPtato commented Nov 18, 2025

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@claude
Copy link

claude bot commented Nov 18, 2025

Pull Request Review: Batch Listen Feature

This PR introduces batch signal listening functionality to the workflow engine, allowing workflows to receive multiple signals in a single operation. This is a significant enhancement that can improve throughput and reduce overhead when dealing with multiple signals.

Summary

Changes: 1,215 additions, 732 deletions across 16 files
Core Feature: Transforms single-signal listening to batch-based signal processing


Positive Aspects

Architecture & Design

  • Well-structured approach: The batch listening is implemented at multiple layers (trait, context, database) with proper abstraction
  • Backwards compatibility: The single listen() method is preserved and now delegates to listen_n() with limit=1, maintaining existing API contracts
  • Consistent pattern: Both listen_with_timeout and listen_until variants support batch operations

Code Quality

  • Type safety: The Listen trait now returns Vec<Self>, enforcing that all signal types support batch operations
  • History tracking: New SignalsEvent properly tracks multiple signals with coordinated signal_ids, names, and bodies vectors
  • Error handling: Maintains existing error patterns while adapting to batch scenarios

Critical Issues

1. Breaking Change: Macro-generated Listen Implementation

Location: engine/packages/gasoline-macros/src/lib.rs:375-381

The #[signal] macro now generates a Listen implementation that returns Vec<Self>:

async fn listen(ctx: &mut gas::prelude::ListenCtx, limit: usize) -> gas::prelude::WorkflowResult<Vec<Self>> {
    ctx
        .listen_any(&[<Self as gas::signal::Signal>::NAME], limit)
        .await?
        .into_iter()
        .map(|signal| Self::parse(&signal.signal_name, &signal.body))
        .collect()
}

Issue: For a single signal type, this will return a Vec containing at most limit signals of the same type. However, if only 1 signal is available, it returns a 1-element vec. The consumer must now handle this differently.

Impact: All workflows using ctx.listen::<SomeSignal>() will need updates, or you need migration logic.

Recommendation: Document this breaking change clearly and consider:

  • Providing a migration guide for existing workflows
  • Adding deprecation warnings if keeping old single-signal methods temporarily

2. Inconsistent Limit Behavior

Location: engine/packages/gasoline/src/ctx/listen.rs:40-49

pub async fn listen_any(
    &mut self,
    signal_names: &[&'static str],
    limit: usize,
) -> WorkflowResult<Vec<SignalData>> {
    // ...
    if signals.is_empty() {
        return Err(WorkflowError::NoSignalFound(Box::from(signal_names)));
    }

Issue: The function fetches up to limit signals across ALL signal names, but the limit is applied per signal name in the database layer (line 1953 in db/kv/mod.rs). This creates ambiguity:

  • Does limit=5 mean 5 total signals, or 5 per signal type?
  • The current implementation fetches up to limit per signal type, then flattens them

Example Problem:

// Listening for 2 signal types with limit=5
// Could return up to 10 signals (5 per type)!
ctx.listen_any(&["signal_a", "signal_b"], 5).await?

Recommendation:

  • Clarify the limit semantics in documentation
  • Consider renaming to limit_per_type or enforcing a global limit by taking only the first N after flattening

3. Missing Signal ID Tracking

Location: engine/packages/gasoline/src/history/event.rs:210-214

pub struct SignalsEvent {
    pub names: Vec<String>,
    pub bodies: Vec<Box<serde_json::value::RawValue>>,
}

Issue: The public SignalsEvent struct in the event history doesn't include signal_ids, but the debug version does (line 1535 in db/kv/debug.rs). This makes it impossible to:

  • Correlate signals to their original IDs in user code
  • Debug which specific signals were processed
  • Implement proper idempotency checks

Recommendation: Add pub signal_ids: Vec<Id> to the public SignalsEvent struct.

4. Data Consistency Risk in History

Location: engine/packages/gasoline/src/db/kv/debug.rs:1542-1577

The TryFrom<WorkflowHistoryEventBuilder> for SignalsEvent checks that vectors are non-empty but doesn't validate they have the same length:

Ok(SignalsEvent {
    signal_ids: if value.indexed_signal_ids.is_empty() {
        return Err(WorkflowError::MissingEventData("signal_id"));
    } else {
        value.indexed_signal_ids
    },
    names: if value.indexed_names.is_empty() {
        return Err(WorkflowError::MissingEventData("name"));
    } else {
        value.indexed_names
    },
    bodies: if value.indexed_input_chunks.is_empty() {
        // ...
    }
})

Issue: If the vectors have different lengths (due to corruption or partial writes), this could cause panics or incorrect signal matching.

Recommendation: Add validation:

ensure!(
    value.indexed_signal_ids.len() == value.indexed_names.len() 
        && value.indexed_names.len() == value.indexed_input_chunks.len(),
    "signal event data vectors must have equal length"
);

Moderate Issues

5. Error Context Loss

Location: engine/packages/gasoline/src/ctx/workflow.rs:707

pub async fn listen<T: Listen>(&mut self) -> Result<T> {
    let signals = self.listen_n::<T>(1).in_current_span().await?;
    signals
        .into_iter()
        .next()
        .context("must return at least 1 signal")
}

Issue: The error message "must return at least 1 signal" is generic and doesn't indicate which signal type was expected. This makes debugging harder.

Recommendation:

.with_context(|| format!("must return at least 1 signal for type {}", std::any::type_name::<T>()))

6. Metrics Removed Without Explanation

Location: engine/packages/gasoline/src/ctx/listen.rs (removed lines 28-42)

The SIGNAL_PULL_DURATION metric was removed, but it provided valuable insight into database performance.

Recommendation: Either restore the metric with batch-aware labels, or document why it was removed in the commit message.

7. Unused Loop Location Parameter

Location: engine/packages/gasoline/src/db/kv/mod.rs:1922

_loop_location: Option<&Location>,

Issue: The parameter is prefixed with _, indicating it's unused. If it's not needed, remove it from the trait definition.

Recommendation: Clean up unused parameters or document why they're reserved for future use.


Minor Issues & Suggestions

8. Display Implementation Inefficiency

Location: engine/packages/gasoline/src/history/event.rs:77-83

EventData::Signals(signals) => {
    let mut unique_names = signals.names.clone();
    unique_names.sort();
    unique_names.dedup();
    write!(f, "signals {:?}", unique_names.join(", "))
}

Suggestion: This allocates and clones for every display call. Consider using a HashSet or caching if displayed frequently.

9. Naming: last_attempt vs last_try

Location: engine/packages/gasoline/src/ctx/listen.rs:17

The parameter was renamed from last_try to last_attempt. While more professional, ensure all documentation references are updated.

10. Test Coverage

Observation: No new test files were added for the batch listening functionality.

Recommendation: Add tests for:

  • Batch signal reception with multiple signals
  • Limit behavior with various signal counts
  • History replay with SignalsEvent
  • Edge cases (empty results, limit=0, etc.)

Security Considerations

11. Resource Exhaustion Risk

Location: Database signal fetching logic

Issue: The limit parameter is user-controlled (via listen_n()). If a workflow requests limit=1000000, it could:

  • Consume excessive memory
  • Cause database performance issues
  • Enable DoS attacks

Recommendation:

  • Add a maximum limit constant (e.g., MAX_BATCH_SIZE = 100)
  • Document the reasonable limit range
  • Consider adding metrics for large batch requests

Performance Considerations

Positive

  • Batch operations reduce database round-trips
  • Parallel signal stream fetching (line 1941-1972 in db/kv/mod.rs) is well-designed

Potential Concerns

  • Multiple signal types with large limits could cause memory spikes
  • No pagination for very large signal backlogs

Documentation & Style

Per CLAUDE.md:

  • ✅ Uses structured logging correctly (e.g., tracing::info!(?x, "foo"))
  • ✅ Lowercase log messages
  • ✅ Proper error handling with anyhow::Result
  • ✅ Follows timestamp naming (create_ts)
  • ❌ Could use more inline documentation for complex functions like pull_next_signals

Recommendations

High Priority

  1. Clarify limit semantics - Document whether limit is per-signal-type or total
  2. Validate vector lengths in SignalsEvent deserialization
  3. Add signal_ids to public API for better debugging and idempotency
  4. Add integration tests for batch signal handling
  5. Document breaking changes for workflows using the old API

Medium Priority

  1. Restore or replace removed metrics
  2. Add maximum limit validation to prevent resource exhaustion
  3. Improve error messages with signal type information
  4. Clean up unused parameters

Low Priority

  1. Optimize Display implementation
  2. Add inline documentation for complex database operations

Conclusion

This is a well-architected feature that adds valuable batch processing capabilities to the workflow engine. The implementation is mostly solid with good separation of concerns and proper abstraction layers.

However, there are critical issues around limit semantics and data consistency that should be addressed before merging. The breaking API changes also need clear documentation and migration guidance.

Overall Assessment: Approve with requested changes

Recommendation: Address critical issues #1-4, then merge. Other issues can be follow-up PRs.


🤖 Generated with Claude Code

@claude
Copy link

claude bot commented Nov 18, 2025

PR Review: feat(gas): add batch listen

Summary

This PR introduces batch signal listening to the Gasoline workflow engine, allowing workflows to listen for and process multiple signals at once instead of one at a time. This is a significant architectural improvement that can reduce latency and improve throughput for workflows that process multiple signals.


Code Quality & Best Practices

Strengths:

  • ✅ Good use of structured logging with tracing (e.g., tracing::debug!(?x, "foo") pattern)
  • ✅ Proper error handling with custom error types
  • ✅ Backward compatibility: Old EventType::Signal is properly converted to the new EventType::Signals format
  • ✅ Consistent naming conventions (snake_case, past tense for timestamps)
  • ✅ Good separation of concerns between database layer and workflow context

Issues & Suggestions:

  1. Missing signal_id in SignalsEvent (engine/packages/gasoline/src/history/event.rs:210-214)

    pub struct SignalsEvent {
        pub names: Vec<String>,
        pub bodies: Vec<Box<serde_json::value::RawValue>>,
    }

    The SignalsEvent struct is missing signal_ids: Vec<Id> field that appears to be used in the debug implementation and history keys. This could cause issues when replaying events or debugging.

  2. Potential ordering issue (engine/packages/gasoline/src/db/kv/mod.rs:2059-2063)

    // Sort by ts
    signals.sort_by_key(|key| key.create_ts);
    
    // Apply limit
    Ok(signals.into_iter().take(limit).collect())

    The limit is applied AFTER sorting, but signals are fetched with a limit from each signal name subspace. If you have 100 signals of type A and 1 of type B, and request limit=10, you might get all 10 from A and miss the one from B even if B is older. Consider sorting BEFORE fetching or using a priority queue approach.

  3. Deprecated metric removed (engine/packages/gasoline/src/metrics.rs)
    The SIGNAL_PULL_DURATION metric was removed but might still be valuable for performance monitoring. Consider keeping it and recording the total duration for pulling all signals in a batch.

  4. API Breaking Change
    The Listen trait now requires returning Vec<Self> instead of Self, and CustomListener trait was removed. This is a breaking change that should be:

    • Documented in a changelog/migration guide
    • Validated that all existing implementations are updated

Potential Bugs

  1. Empty signals vec guarantee (engine/packages/gasoline/src/ctx/workflow.rs:703-710)

    pub async fn listen<T: Listen>(&mut self) -> Result<T> {
        let signals = self.listen_n::<T>(1).in_current_span().await?;
    
        signals
            .into_iter()
            .next()
            .context("must return at least 1 signal")
    }

    The code assumes listen_n will never return an empty vec, but this isn't enforced by the type system. If listen_n returns empty, this will panic with "must return at least 1 signal". Consider adding an assertion in listen_n or handling this more gracefully.

  2. Race condition in listen_n_until (engine/packages/gasoline/src/ctx/workflow.rs:1154-1182)
    After a sleep is interrupted, the code checks for signals in history. However, if the history check finds HistoryResult::New instead of signals, it continues to listen for new signals. There's a potential edge case where signals could be missed between the sleep interruption and the history check.

  3. Buffer size assumption (engine/packages/gasoline/src/db/kv/mod.rs:2055)

    .buffer_unordered(1024)

    Hard-coded buffer of 1024 could be problematic if limit is much smaller or larger. Consider using buffer_unordered(limit.max(16).min(1024)) for better performance.


Performance Considerations

Improvements:

  • ✅ Batching reduces database round trips
  • ✅ Using buffer_unordered for concurrent signal processing
  • ✅ Sorting and limiting happen after fetching, avoiding multiple DB queries

Concerns:

  1. Flattening signal streams (engine/packages/gasoline/src/db/kv/mod.rs:1941-1961)

    futures_util::stream::iter(owned_filter.clone())
        .map(|signal_name| { /* ... */ })
        .flatten()

    Using .flatten() on multiple range queries could lead to uneven fetching if one signal type has many more entries than others. The current approach fetches limit from EACH signal type, then sorts and takes limit total. This could fetch more data than needed.

  2. Multiple indexed key writes (engine/packages/gasoline/src/db/kv/keys/history.rs)
    The new implementation writes separate keys for:

    • Each signal ID (IndexedSignalIdKey)
    • Each signal name (IndexedNameKey)
    • Each signal input chunk (IndexedInputChunkKey)

    This could significantly increase write amplification. For N signals with M chunks each, you're writing ~N*(2+M) keys instead of N keys. Consider benchmarking the performance impact.


Security Concerns

  1. No validation on limit parameter
    The limit parameter in listen_n is not validated. A malicious or buggy caller could pass usize::MAX which could cause:

    • Excessive memory allocation
    • Database query timeouts
    • DoS conditions

    Recommend adding: let limit = limit.min(MAX_SIGNAL_BATCH_SIZE); with a reasonable constant.

  2. Signal body size not validated
    When combining chunks into signal bodies, there's no validation that the total size is reasonable. Large signal bodies could cause OOM issues.


Test Coverage

Missing:

  • ❌ No test files modified in this PR
  • ❌ No tests for listen_n functionality
  • ❌ No tests for batch signal processing
  • ❌ No tests for the new SignalsEvent type
  • ❌ No tests for backward compatibility (old Signal → new Signals conversion)
  • ❌ No tests for limit parameter edge cases (0, 1, large numbers)
  • ❌ No tests for signal ordering when batching multiple signal types

Recommended Tests:

// Test batch listen with multiple signals
#[tokio::test]
async fn test_listen_n_multiple_signals() {
    // Send 5 signals, listen for 3, verify we get the oldest 3
}

// Test listen_n with limit larger than available signals
#[tokio::test]  
async fn test_listen_n_limit_exceeds_available() {
    // Send 2 signals, listen for 5, verify we get 2
}

// Test backward compatibility
#[tokio::test]
async fn test_old_signal_event_conversion() {
    // Load workflow with old Signal events, verify they convert to Signals
}

// Test signal ordering across types
#[tokio::test]
async fn test_listen_n_mixed_signal_types() {
    // Send signals of type A and B interleaved by time
    // Verify they're received in chronological order
}

Architecture & Design

Positive:

  • The batch approach is a good architectural decision for high-throughput scenarios
  • Clean separation between listen() (single) and listen_n() (batch) maintains backward compatibility at the API level
  • The history cursor changes properly support the new batch model

Questions:

  1. Why was CustomListener removed? Were there no use cases for it, or does batch listening subsume its functionality?
  2. Should there be a way to listen for "all available signals up to N" vs "exactly N signals or error"? The current implementation errors if fewer signals are available.

Summary & Recommendations

This is a well-structured PR that adds valuable batching capabilities to the workflow engine. However, it needs:

Must Fix:

  1. Add signal_ids field to SignalsEvent struct
  2. Add validation for limit parameter to prevent DoS
  3. Fix the ordering issue in signal fetching (fetch limit total, not limit per type)
  4. Add comprehensive test coverage

Should Fix:

  1. Consider keeping SIGNAL_PULL_DURATION metric
  2. Document the breaking changes to Listen trait
  3. Add buffer size tuning based on limit
  4. Add signal body size validation

Nice to Have:

  1. Add migration guide for users of old CustomListener trait
  2. Benchmark write amplification impact
  3. Consider exposing batch size as a configuration parameter

The core implementation is solid, but the missing tests and potential edge cases around limits and ordering need to be addressed before merging.


Generated by Claude Code

@claude
Copy link

claude bot commented Nov 18, 2025

Code Review

Summary

This PR adds batch signal listening capabilities to the Gasoline workflow engine, allowing workflows to receive multiple signals in a single operation instead of one at a time. This is a significant architectural improvement that should improve efficiency when dealing with high-volume signal processing.

Architecture Changes

Well-designed changes:

  1. Signal History Consolidation: The new SignalsEvent replaces individual signal events in history, batching multiple signals into a single event. This is more efficient and cleaner than the previous approach.

  2. Indexed Storage: The implementation uses indexed keys (IndexedSignalIdKey, IndexedNameKey, IndexedInputKey) to store multiple signals within a single history event, which is a clean database schema design.

  3. Backward Compatibility: The PR properly deprecates the old Signal event type (EventType::Signal = 1) while introducing Signals (EventType::Signals = 10), maintaining compatibility.

Code Quality

Strengths:

  1. Consistent API Design: The new listen_n() and listen_n_with_timeout() methods follow the existing naming conventions and patterns.

  2. Proper Error Handling: Maintains proper error handling throughout, including history divergence checks in compare_signals().

  3. Metrics & Observability: Maintains logging and metrics (signal receive lag tracking, etc.).

  4. Macro Updates: The #[signal] macro and join_signal\! macro properly updated to return Vec<Self> instead of single values.

Potential Issues

⚠️ Critical Issues:

1. EventType Mismatch in insert::signals_event (packages/gasoline/src/db/kv/keys/history.rs:1645)

pub fn signals_event(/* ... */) -> Result<()> {
    common(
        subspace,
        tx,
        workflow_id,
        location,
        EventType::Signal,  // ❌ Should be EventType::Signals
        version,
        create_ts,
    )
}

This appears to use EventType::Signal (the deprecated type) instead of EventType::Signals. This could cause history corruption or replay issues.

Recommended fix:

EventType::Signals,  // Use the new batch signal type

2. Missing Validation on Signal Count

The SignalsEvent struct (packages/gasoline/src/history/event.rs:211-214) stores names and bodies as separate vectors but does not enforce that they have the same length:

pub struct SignalsEvent {
    pub names: Vec<String>,
    pub bodies: Vec<Box<serde_json::value::RawValue>>,
}

In the database deserialization code (packages/gasoline/src/db/kv/debug.rs:1545), there is a zip operation that could silently drop data if lengths do not match. Consider adding validation:

pub struct SignalsEvent {
    pub names: Vec<String>,
    pub bodies: Vec<Box<serde_json::value::RawValue>>,
}

impl SignalsEvent {
    pub fn new(names: Vec<String>, bodies: Vec<Box<serde_json::value::RawValue>>) -> Result<Self> {
        ensure\!(names.len() == bodies.len(), "signals names and bodies must have same length");
        ensure\!(\!names.is_empty(), "signals event must contain at least one signal");
        Ok(Self { names, bodies })
    }
}

⚠️ Medium Priority Issues:

3. Removed CustomListener Without Clear Migration Path

The PR removes the CustomListener trait and custom_listener() method entirely:

  • Removed from packages/gasoline/src/ctx/versioned_workflow.rs (lines 128-136)
  • Not mentioned in PR description

If any users were using CustomListener, this is a breaking change. Consider:

  1. Adding a deprecation note/migration guide
  2. Ensuring no internal code was using this functionality
  3. Documenting this breaking change

4. Potential Performance Issue: Sort After Parallel Fetch

In pull_next_signals (packages/gasoline/src/db/kv/mod.rs:2059-2063):

.buffer_unordered(1024)
.try_collect::<Vec<_>>()
.await?;

// Sort by ts
signals.sort_by_key(|key| key.create_ts);

// Apply limit
Ok(signals.into_iter().take(limit).collect())

The code fetches up to limit signals from each signal name stream in parallel, then sorts all results by timestamp, then takes limit items. This means:

  • If listening for N signal types with limit=10, you might fetch 10*N signals then discard all but 10
  • The buffer_unordered(1024) suggests very high parallelism but the final limit might waste most of that work

Recommendation: Consider limiting the total fetch earlier or documenting the tradeoff.

Performance Considerations

Good:

  • Batching signals reduces database round-trips
  • Parallel signal body fetching with buffer_unordered(1024)
  • Maintains existing polling/subscription mechanisms

⚠️ Concern:

  • The listen_n semantics could lead to workflows requesting large batches unnecessarily if not used carefully
  • No apparent safeguard against very large batch sizes (could the limit parameter be validated against a max?)

Security

No obvious security concerns

  • Maintains existing workflow isolation
  • No new injection vectors
  • Proper deserialization error handling

Test Coverage

Critical Gap: No Tests for Batch Listening

The PR adds significant new functionality but I could not find any tests for:

  • listen_n() functionality
  • listen_n_with_timeout()
  • listen_n_until()
  • Multiple signals in a single SignalsEvent
  • History replay with batched signals

Recommendation: Add tests covering:

  1. Basic listen_n(limit=5) with multiple signals arriving
  2. Behavior when fewer than limit signals arrive
  3. Correct ordering of batched signals by timestamp
  4. History replay with SignalsEvent containing multiple signals
  5. Timeout behavior with listen_n_with_timeout

Example test case:

#[workflow_test]
async fn test_batch_signal_listen() {
    // Publish 5 signals
    // ctx.listen_n::<MySignal>(10) should return all 5
    // Verify order matches timestamps
}

Additional Observations

  1. Good: The listen() method is now implemented as listen_n(1) which is a clean refactor that ensures consistency.

  2. Good: Updated print/debug utilities to handle the new SignalsEvent type.

  3. Question: The PR description is empty. Consider adding:

    • Why batch listening is needed
    • Performance improvement metrics (if available)
    • Breaking changes documentation
    • Migration guide for CustomListener users

Recommendations

Must Fix:

  1. Fix EventType::SignalEventType::Signals in insert::signals_event
  2. Add validation to SignalsEvent to ensure names.len() == bodies.len()
  3. Add comprehensive test coverage for batch listening

Should Consider:

  1. Add PR description explaining the changes and migration path
  2. Document the breaking change (removal of CustomListener)
  3. Review the fetch-all-then-limit performance pattern in pull_next_signals
  4. Add max batch size validation to prevent abuse

Nice to Have:

  1. Add examples of using listen_n in documentation
  2. Performance benchmarks showing improvement over sequential signal listening

Verdict

This is a well-architected feature that makes logical sense for the workflow engine. However, it has a critical bug (wrong EventType) and lacks test coverage. Once these issues are addressed, this will be a solid improvement.

Status: Needs Changes 🔴

The EventType bug could cause production issues with history replay, and the lack of tests makes it risky to merge. Please address the critical issues before merging.

@claude
Copy link

claude bot commented Nov 19, 2025

PR Review: Add Batch Listen

Overview

This PR introduces batch signal listening capabilities to the Gasoline workflow engine, allowing workflows to receive multiple signals in a single operation.

Strengths

1. Well-Structured API Changes

  • Listen trait now returns Vec - clean abstraction
  • Backward compatibility via listen() calling listen_n(1)
  • Clear batch semantics with listen_n() and listen_n_with_timeout()

2. Comprehensive Database Implementation

  • Well-designed indexed signal storage keys (IndexedSignalIdKey, IndexedNameKey, IndexedInputKey)
  • Proper chunked keys for large signal bodies
  • SignalsEvent type properly supports multiple signals

3. Good Error Handling

  • Empty vecs trigger NoSignalFound appropriately (ctx/listen.rs:68)
  • History divergence checks validate signal events (history/cursor.rs:561-590)

4. Logging and Observability

  • Structured logging with signal counts, IDs, names
  • Per-signal metrics for receive lag
  • Enhanced debug printing (engine/src/util/wf/mod.rs:470-497)

Concerns

1. Breaking API Change ⚠️
Listen trait signature changed. Any custom implementations will break.

Recommendations:

  • Document as breaking change in PR description
  • Search for manual Listen implementations
  • Add migration notes

2. Potential Index Corruption 🔴
db/kv/debug.rs:897-940 rebuilds events from indexed keys with ordering checks. Partial transaction failures could cause corruption on replay.

Recommendations:

  • Add integration tests for partial write failures
  • Consider repair/recovery logic
  • Document invariants

3. Empty Vec Semantics ⚠️
Inconsistent behavior:

  • listen_n_with_timeout() → empty vec valid (timeout)
  • listen_n() → empty vec errors

Recommendation: Document when empty vecs are expected vs errors

4. Test Coverage Gaps 🟡
Missing tests for:

  • Batch listening with n > 1
  • Mixed signal types in batches
  • Replay with batch signals
  • Edge cases (more requested than available)

5. Performance 💭
Sequential processing of signals for metrics/logging could add latency for large batches.

6. Magic Number 🟡
recv_lag > 3.0 check needs constant and explanation (ctx/listen.rs:82)

7. Removed EventData::Empty
Was Empty unused or does this affect backfilling?

Security & Safety

✅ No security concerns
✅ Proper transactions and conflict ranges
✅ Signal ack timestamps correct

Conclusion

Well-implemented feature! Main concerns:

  1. Breaking change documentation needed
  2. Index corruption risk - needs testing
  3. Batch-specific test coverage needed

Recommendation: Add breaking change docs and batch tests before merging.

Overall: Approve with comments 🎉

@claude
Copy link

claude bot commented Nov 19, 2025

PR Review: feat(gas): add batch listen

This PR introduces batch signal listening to the Gasoline workflow engine, allowing workflows to receive multiple signals in a single operation. This is a significant performance optimization.

Architecture Changes

Core API Changes:

  • Listen trait now returns Vec and accepts a limit parameter
  • WorkflowCtx::listen() now internally calls listen_n(1)
  • New methods: listen_n(), listen_n_with_timeout(), and listen_n_until()
  • Removed CustomListener trait

Database Layer:

  • New pull_next_signals() method with parallel signal fetching
  • Signals sorted by timestamp and limited to requested count
  • New EventType::Signals replaces individual signal events

History Management:

  • New SignalsEvent with indexed data structures
  • New history keys for indexed signals
  • New cursor method: compare_signals()

Code Quality - Strengths

  • Excellent use of parallel processing with buffer_unordered(1024)
  • Proper transaction boundaries with conflict ranges
  • Maintains backward compatibility
  • Comprehensive tracing and metrics
  • Proper error propagation

Areas for Improvement

  1. Transaction Size (db/kv/mod.rs:1988-2057): Large batches could create very large transactions. Consider transaction size limits.

  2. Sorting After Limit (db/kv/mod.rs:2059-2063): Fetches limit signals from each stream, then sorts and limits again. Could fetch more than needed. Add clarifying comment or adjust logic.

  3. Error Context (ctx/workflow.rs:707-708): Use expect() instead of context() since empty result indicates a bug.

Potential Bugs

  1. Index Ordering (db/kv/debug.rs:901-927): Assumes indices arrive in order. Use HashMap for robustness.

  2. Empty Signals Vec (ctx/workflow.rs:1165-1177): When replaying with SleepState::Interrupted, code assumes non-empty vec. Add validation.

Performance

Positive:

  • Parallel signal fetching is excellent
  • Single transaction reduces round trips

Concerns:

  • No bounds on signal body sizes (memory pressure)
  • Metrics recorded in loop for each signal (could be slow for large batches)

Security

No significant issues. Proper conflict ranges prevent races, signal ack prevents double-delivery.

Test Coverage

Missing: No test files in PR. Critical tests needed:

  1. listen_n() with limit > available signals
  2. Signal ordering with mixed timestamps
  3. History replay with batched signals
  4. listen_n_until() timeout scenarios
  5. Transaction conflicts during batch pull

Best Practices

Follows CLAUDE.md conventions well (structured logging, lowercase messages, anyhow::Result, proper timestamps).

Minor: Some functions exceed reasonable length (180 lines). Consider extracting helpers.

Final Recommendation

APPROVE with minor recommendations

Well-architected change with meaningful performance improvements. Code quality is high and follows conventions.

Before merging:

  1. Add integration tests
  2. Address empty signals vector edge case
  3. Add clarifying comments for per-stream limit logic
  4. Consider transaction size limits

Nice work! The parallel signal fetching is particularly well done.

@claude
Copy link

claude bot commented Nov 20, 2025

PR Review: feat(gas): add batch listen

I've completed a thorough review of this PR. Overall, this is a well-architected change that adds batch signal listening to the workflow engine.

Strengths

Architecture & Design:

  • Clean API design that maintains backward compatibility while adding batching capability
  • The listen() method is now built on top of listen_n() with limit=1, avoiding code duplication
  • Good separation of concerns between Listen trait changes and workflow context implementation
  • The new Signals event type properly integrates into the existing event history system

Implementation Quality:

  • Proper error handling throughout the batch operations
  • Consistent logging with structured fields
  • Transaction safety in pull_next_signals with proper conflict range handling
  • Good use of parallel processing with buffer_unordered(1024) for signal fetching

Database Layer:

  • Efficient batching at the database level with proper index management
  • Correct handling of wake conditions on the last attempt
  • Proper sorting by timestamp and application of limits

Issues & Concerns

1. Missing signal_ids field in SignalsEvent (history/event.rs:211-214)

The SignalsEvent struct only has names and bodies, but the debug version includes signal_ids. This inconsistency could cause issues when replaying history or debugging workflows.

Recommendation: Add signal_ids: Vec to the main SignalsEvent struct for consistency.

2. Potential ordering issue in listen_n_until

When signals are received during a timeout, they may not maintain consistent ordering across different code paths.

Recommendation: Document whether signal order within a batch is guaranteed.

3. Empty Vec handling could be clearer (ctx/workflow.rs:706-708)

If listen_n returns an empty vec due to a bug, this panics with a generic error rather than a workflow error.

Recommendation: Use WorkflowError instead of generic context for better error handling.

4. Missing validation in pull_next_signals

The limit parameter is used directly without validation. If limit is 0, this could cause unexpected behavior.

Recommendation: Add validation at the API boundary to ensure limit > 0.

5. Deprecated Signal event type

The old Signal event type is marked deprecated but may not be handled in all match statements.

Recommendation: Verify all match statements handle the deprecated type appropriately.

Performance Considerations

Positive:

  • Batching reduces database round-trips significantly
  • Parallel signal fetching is well-optimized
  • Efficient use of streaming modes in UniversalDB queries

Potential concerns:

  • Memory usage could spike if workflows listen for very large signal batches

Recommendation: Consider adding configuration for max batch size or document recommended limits.

Security

No security concerns identified. The implementation properly validates versions during history replay and uses appropriate transaction isolation levels.

Test Coverage

Needs attention:

  • No test files modified in this PR
  • Batch listening is a significant new feature that should have explicit test coverage
  • Edge cases to test: limit=0, limit > available signals, mixed signal types, timeout behavior, history replay, signal ordering

Recommendation: Add comprehensive tests for the new batch functionality.

Summary

This is a well-implemented feature that significantly improves the workflow engine's signal handling capabilities. The main concerns are:

  1. Missing signal_ids in the public SignalsEvent struct (data inconsistency)
  2. Lack of test coverage for the new functionality
  3. Minor parameter validation gaps

Overall Assessment: Approve with recommendations to address the issues above, particularly adding the signal_ids field and test coverage.


Great work on maintaining code quality and following the project's conventions!

@claude
Copy link

claude bot commented Nov 20, 2025

PR Review: feat(gas): add batch listen

Summary

This PR adds batch listening functionality to the Gasoline workflow engine, allowing workflows to listen for and process multiple signals at once. This is a significant architectural improvement that reduces database transactions and improves throughput.

Code Quality ✅

  • Clean API design maintaining backward compatibility
  • Well-structured indexed database keys for batch signal storage
  • Consistent naming (listen_n, listen_n_with_timeout, listen_n_until)
  • Good use of Rust iterators and functional patterns

Potential Issues ⚠️

1. Empty Vector Handling (Critical)

In workflow.rs:703, listen() uses .context() on Option which panics on None. If listen_n returns empty vector, this fails. Use .ok_or_else() instead.

2. Limit Application

In kv/mod.rs:2063, limit applies per signal name stream (could fetch limit*num_names), then takes limit after sorting. Document if intentional.

3. Buffer Size

buffer_unordered(1024) at kv/mod.rs:2055 needs documentation explaining the choice.

4. History Compatibility

Ensure compare_signals handles transition from Signal to Signals events and maintains backward compatibility.

Performance ⚡

Positives: Batch processing reduces DB round trips, parallel fetching with buffer_unordered
Concerns: Large batches increase memory, consider adding batch size metrics

Breaking Changes 🔴

  • Listen::listen now returns Vec instead of Self
  • CustomListener trait removed
  • Migration impact: custom Listen implementations will break

Test Coverage 🧪

Missing tests for: batch limits, mixed signal types, empty batches, history replay, race conditions, backward compatibility

Recommendations

High Priority:

  • Fix empty vector handling in listen()
  • Add comprehensive batch listening tests
  • Document breaking changes
  • Clarify limit behavior

Medium Priority:

  • Add batch size metrics
  • Document buffer_unordered choice
  • Test compare_signals thoroughly

Overall Assessment ✅⚠️

Well-architected improvement with clean design. Address edge case handling, test coverage, and documentation before merging. Breaking changes are necessary and well-scoped.

Great work maintaining consistency with existing patterns!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants